// Copyright 2016 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
	"context"
	"crypto/subtle"
	"encoding/hex"
	"errors"
	"fmt"
	"math"
	"time"

	"google.golang.org/grpc/codes"

	ds "go.chromium.org/gae/service/datastore"
	"go.chromium.org/luci/common/clock"
	"go.chromium.org/luci/common/data/rand/cryptorand"
	"go.chromium.org/luci/common/logging"
	google_pb "go.chromium.org/luci/common/proto/google"
	dm "go.chromium.org/luci/dm/api/service/v1"
	"go.chromium.org/luci/grpc/grpcutil"
)

const ek = logging.ErrorKey

type invertedHexUint32 uint32

const invertedHexUint32RenderFmt = "%08x"

var _ ds.PropertyConverter = (*invertedHexUint32)(nil)

func (i *invertedHexUint32) ToProperty() (ret ds.Property, err error) {
	err = ret.SetValue(fmt.Sprintf(
		invertedHexUint32RenderFmt, (*i)^math.MaxUint32), ds.NoIndex)
	return
}

func (i *invertedHexUint32) FromProperty(p ds.Property) (err error) {
	sVal, err := p.Project(ds.PTString)
	if err != nil {
		return
	}

	tmp := uint32(0)
	if _, err = fmt.Sscanf(sVal.(string), invertedHexUint32RenderFmt, &tmp); err != nil {
		return
	}
	*i = invertedHexUint32(tmp ^ math.MaxUint32)
	return
}

// Execution represents either an ongoing execution on the Quest's specified
// distributor, or is a placeholder for an already-completed Execution.
type Execution struct {
	ID      invertedHexUint32 `gae:"$id"`
	Attempt *ds.Key           `gae:"$parent"`

	Created  time.Time
	Modified time.Time

	// DistributorConfigName is redundant with the Quest definition, but this
	// helps avoid extra unnecessary datastore round-trips to load the Quest.
	DistributorConfigName    string
	DistributorConfigVersion string
	DistributorToken         string

	State dm.Execution_State

	// IsAbnormal is true iff State==ABNORMAL_FINISHED. Used for walk_graph.
	IsAbnormal bool

	// A lazily-updated boolean to reflect that this Execution is expired for
	// queries.
	IsExpired bool

	// Contains either data (State==FINISHED) or abnormal_finish (State==ABNORMAL_FINISHED)
	Result dm.Result `gae:",noindex"`

	// These are DM's internal mechanism for performing timeout actions on
	// Executions.
	//
	// The TimeTo* variables are copied from the quest description.
	//
	// The Timeout is only active when the Execution is in a non-terminal state.
	TimeToStart time.Duration `gae:",noindex"` // timeouts.start
	TimeToRun   time.Duration `gae:",noindex"` // timeouts.run
	TimeToStop  time.Duration `gae:",noindex"` // pollTimeout || timeouts.stop

	// Token is a randomized nonce that's used to verify that RPCs verify from the
	// expected client (the client that's currently running the Execution). The
	// Token has 2 modes.
	//
	// When the Execution is handed to the distributor, the Token is randomly
	// generated by DM and passed to the distributor. The State of the Execution
	// starts as SCHEDULED. This token may be used by the client to "activate" the
	// Execution with the ActivateExecution rpc. At that point, the client
	// provides a new random token, the Execution State moves from SCHEDULED to
	// RUNNING, and Token assumes the new value. As long as the Execution State is
	// RUNNING, the client may continue to use that new Token value to
	// authenticate other rpc's like AddDeps and FinishAttempt.
	//
	// As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED
	// state, this will be nil'd out.
	Token []byte `gae:",noindex"`
}

// MakeExecution makes a new Execution in the SCHEDULING state, with a new
// random Token.
func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution {
	now := clock.Now(c).UTC()
	ret := &Execution{
		ID:      invertedHexUint32(e.Id),
		Attempt: AttemptKeyFromID(c, e.AttemptID()),

		Created:  now,
		Modified: now,

		DistributorConfigName:    cfgName,
		DistributorConfigVersion: cfgVers,

		Token: MakeRandomToken(c, dm.MinimumActivationTokenLength),
	}
	return ret
}

// ModifyState changes the current state of this Execution and updates its
// Modified timestamp.
func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error {
	if e.State == newState {
		return nil
	}
	if err := e.State.Evolve(newState); err != nil {
		return err
	}
	now := clock.Now(c).UTC()
	if now.After(e.Modified) {
		e.Modified = now
	} else {
		// Microsecond is the smallest granularity that datastore can store
		// timestamps, so use that to disambiguate: the goal here is that any
		// modification always increments the modified time, and never decrements
		// it.
		e.Modified = e.Modified.Add(time.Microsecond)
	}
	return nil
}

// MakeRandomToken creates a cryptographically random byte slice of the
// specified length. It panics if the specified length cannot be read in full.
func MakeRandomToken(c context.Context, l uint32) []byte {
	rtok := make([]byte, l)
	if _, err := cryptorand.Read(c, rtok); err != nil {
		panic(err)
	}
	return rtok
}

// Revoke will clear the Token and Put this Execution to the datastore. This
// action requires the Execution to be in the RUNNING state, and causes it to
// enter the STOPPING state.
func (e *Execution) Revoke(c context.Context) error {
	e.Token = nil
	if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil {
		return err
	}
	return ds.Put(c, e)
}

func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Execution, err error) {
	a = &Attempt{ID: *eid.AttemptID()}
	e = &Execution{ID: invertedHexUint32(eid.Id), Attempt: ds.KeyForObj(c, a)}
	err = ds.Get(c, a, e)

	if err != nil {
		err = grpcutil.Errf(codes.Internal,
			"couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err)
		return
	}

	if a.CurExecution != uint32(e.ID) {
		err = fmt.Errorf("verifying incorrect execution %d, expected %d", a.CurExecution, e.ID)
		return
	}
	return
}

func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
	a, e, err = loadExecution(c, auth.Id)
	if err != nil {
		return
	}

	if a.State != dm.Attempt_EXECUTING {
		err = errors.New("Attempt is not executing")
		return
	}

	if e.State != dm.Execution_RUNNING {
		err = errors.New("Execution is not running")
		return
	}

	if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
		err = fmt.Errorf("incorrect Token: %x", hex.EncodeToString(auth.Token))
	}
	return
}

func makeError(err error, msg string) error {
	code := grpcutil.Code(err)
	if code == codes.Unknown {
		code = codes.PermissionDenied
	}
	return grpcutil.Errf(code, msg)
}

// AuthenticateExecution verifies that the Attempt is executing, and that evkey
// matches the execution key of the current Execution for this Attempt.
//
// As a bonus, it will return the loaded Attempt and Execution.
func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
	a, e, err = verifyExecutionAndCheckExTok(c, auth)
	if err != nil {
		logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
		err = makeError(err, "requires execution Auth")
	}
	return a, e, err
}

// InvalidateExecution verifies that the execution key is valid, and then
// revokes the execution key.
//
// As a bonus, it will return the loaded Attempt and Execution.
func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) {
	if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil {
		logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution")
		err = makeError(err, "requires execution Auth")
		return
	}

	err = e.Revoke(c)
	if err != nil {
		logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution")
		err = makeError(err, "unable to invalidate Auth")
	}
	return
}

func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actTok []byte) (a *Attempt, e *Execution, err error) {
	a, e, err = loadExecution(c, auth.Id)
	if err != nil {
		return
	}

	if a.State != dm.Attempt_EXECUTING {
		err = errors.New("Attempt is in wrong state")
		return
	}

	switch e.State {
	case dm.Execution_SCHEDULING:
		if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 {
			err = errors.New("incorrect ActivationToken")
			return
		}

		e.State.MustEvolve(dm.Execution_RUNNING)
		e.Token = actTok
		err = ds.Put(c, e)
		logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id)

	case dm.Execution_RUNNING:
		if subtle.ConstantTimeCompare(e.Token, actTok) != 1 {
			err = errors.New("incorrect Token")
		}
		// either the Token matched, in which case this is simply a retry
		// by the same client, so there's no error, or it's wrong which means it's
		// a retry by a different client.

		logging.Infof(c, "already activated execution %s", auth.Id)

	default:
		err = fmt.Errorf("Execution is in wrong state")
	}
	return
}

// ActivateExecution validates that the execution is unactivated and that
// the activation token matches and then sets the token to the new
// value.
//
// It's OK to retry this. Subsequent invocations with the same Token
// will recognize this case and not return an error.
func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []byte) (a *Attempt, e *Execution, err error) {
	a, e, err = verifyExecutionAndActivate(c, auth, actToken)
	if err != nil {
		logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution")
		err = makeError(err, "failed to activate execution Auth")
	}
	return a, e, err
}

// GetEID gets an Execution_ID for this Execution. It panics if the Execution
// is in an invalid state.
func (e *Execution) GetEID() *dm.Execution_ID {
	aid := &dm.Attempt_ID{}
	if e.ID == 0 {
		panic("cannot create valid Execution_ID with 0-value ID field")
	}
	if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil {
		panic(err)
	}
	return dm.NewExecutionID(aid.Quest, aid.Id, uint32(e.ID))
}

// ToProto returns a dm proto version of this Execution.
func (e *Execution) ToProto(includeID bool) *dm.Execution {
	ret := &dm.Execution{Data: e.DataProto()}
	if includeID {
		ret.Id = e.GetEID()
	}
	return ret
}

// DataProto returns an Execution.Data message for this Execution.
//
// This omits the DistributorInfo.Url portion, which must be filled in elsewhere for
// package cyclical import reasons.
func (e *Execution) DataProto() (ret *dm.Execution_Data) {
	switch e.State {
	case dm.Execution_SCHEDULING:
		ret = dm.NewExecutionScheduling().Data
	case dm.Execution_RUNNING:
		ret = dm.NewExecutionRunning().Data
	case dm.Execution_STOPPING:
		ret = dm.NewExecutionStopping().Data
	case dm.Execution_FINISHED:
		ret = dm.NewExecutionFinished(e.Result.Data).Data
	case dm.Execution_ABNORMAL_FINISHED:
		ret = dm.NewExecutionAbnormalFinish(e.Result.AbnormalFinish).Data
	default:
		panic(fmt.Errorf("unknown Execution_State: %s", e.State))
	}
	ret.Created = google_pb.NewTimestamp(e.Created)
	ret.Modified = google_pb.NewTimestamp(e.Modified)
	ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{
		ConfigName:    e.DistributorConfigName,
		ConfigVersion: e.DistributorConfigVersion,
		Token:         e.DistributorToken,
	}
	return ret
}
